-
Notifications
You must be signed in to change notification settings - Fork 65
feat: tedge connect to c8y mqtt service endpoint #3707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: tedge connect to c8y mqtt service endpoint #3707
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅ 📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
a332515
to
6e25aff
Compare
Given that there is a clear vision that the Cumulocity mqtt-service will replace the MQTT core functionality, then approach 1 is probably the best approach for delivering the immediate "mqtt-service support" feature. As it would also add a smooth transition to only using the mqtt-service without changing much on the device side, e.g. Though I still think we should explore the idea of support the connection to any MQTT broker by allowing option 2, but we could do that specific stuff in a different feature implementation. Where a generic feature could look very similar to what is being proposed, by you can also specify a "type" [bridge.custom] # where "custom" is the unique id of the mapper
type = "generic | c8y (mqtt-core) | c8y-mqtt-service | aws | az" # type control default mapping rules and any type specific connection logic
device.key_path = ""
device.cert_path = ""
bridge.username = "${cert.Issuer.CN}" # reference meta info from the device's certificate
bridge.topic_prefix = "${bridge.id}" # reference its own id, custom
# control which subscriptions the bridge will subscribe to
remote_subscriptions = [
"3"
]
# type specific settings (e.g. for the c8y proxy)
c8y.proxy.bind.port = 8001
# future properties
pipelines_dir = "/etc/tedge/pipelines/${bridge.id}" # specify which mapping/filtering rules should be used (once we have a generic mapper) |
6e25aff
to
9979965
Compare
tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot
Outdated
Show resolved
Hide resolved
Execute Command tedge config set c8y.tenant_id t37070943 | ||
Execute Command tedge config set c8y.mqtt_service.enabled true | ||
Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic' | ||
Execute Command tedge connect c8y |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can I check that the connection to the MQTT service is actually established?
- Neither
tedge connect c8y
nortedge connect c8y --test
tells a word about the MQTT service and status. - I tried on purpose to break the config, no errors are returned by
tedge connect
.
Good. The bridge status published on te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health
is working and reports broken as well as correct settings.
=> I would add a test assertion that a status message of 1
is received on this topic.
Execute Command tedge config set mqtt.bridge.built_in true | ||
Execute Command tedge config set c8y.tenant_id t37070943 | ||
Execute Command tedge config set c8y.mqtt_service.enabled true | ||
Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's something really weird with multiple subscriptions on MQTT service. MQTT service is rejecting subscription requests with multiple topics from the built-in bridge, which is why this test case was failing. Here is the same reproduced with mosquitto_sub
client:
$ mosquitto_sub -v -h albin.latest.stage.c8y.io -p 9883 -i albin-123 --key device.key --cert device.crt --cafile c8y-root.crt -u t37070943 -t sub/topic -t demo/topic
All subscription requests were denied.
Once you limit the subscription to a single topic, it works. This test also works when there is only one subscription topic.
But to my surprise, the mosquitto bridge with multiple subscriptions works just fine (as done in the first test case).
} | ||
|
||
// Templates | ||
tc.forward_from_local("#", local_prefix.clone(), "")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this bridge rule triggers infinite loops. This forward_from_local
rule bridges anything received on c8y-mqtt/#
to the cloud and the above forward_from_remote
rule bridges anything received on the subscribed topics to local topics with the c8y/
prefix. And this creates an infinite loop when a message is received from the cloud.
But again, the mosquitto bridge detects and handles these cycles somehow, but the built-in bridge doesn't. So, either we should improve the built-in bridge to handle the same, or have either one of the following rules enforced for the mqtt service bridge:
- Use different prefixes to bridge outbound and inbound messages
- Do not bridge all published messages on
c8y-mqtt/#
, but make the user explicitly specify the publish topics to be bridged as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some deduplication logic we already have for the AWS shadow topics. Unfortunately there isn't a nice way to naturally avoid loops with MQTT v3 since there is no way to opt out of receiving messages you published (assuming you are subscribed to the relevant topic). So to detect and avoid the loops, the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.
We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally. For testing this works against mosquitto, forwarding the cloud topics bidirectionally (using tc.forward_bidirectionally
) and subscribing separately to #
should be fine, since mosquitto will essentially ignore the double subscription (but this would be problematic with nanomq).
In short, provided we don't have complex overlapping rules, it should be quite possible to make this just work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.
That sounds fine. But, to be on the safer side with that caching (esp with half open connections), it'd be better if the cached entries have a time-to-live as well, post-which they are cleared whether or not the duplicate is received from the cloud or not. Sure, this might lead to a late duplicate message getting resent to the cloud once more, but that should settle after a few loops once the network stabilises.
We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally
I didn't understand that proposal. How does a bidirectional subscription to the same topic (e.g: demo/topic
) both on the cloud and the local help? Esp since the expectation on the local broker is to bridge messages on c8y-mqtt/demo/topic
(included in c8y-mqtt/#
) and not demo/topic
directly. Could you explain where/how the loop is broken when a message is received first from the cloud on demo/topic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do a bidirectional forward, this tells the bridge to break loops on those messages. It can have different prefixes on local and cloud topics, which is exactly how the AWS shadow topic works at the moment.
Because the bridge knows in advance the topic will cause a loop, it can cache the published message when it forwards to local/cloud and then use that cached message to ignore the same message when the bridge receives the message from the target broker.
The only difference from the AWS shadow topics is that we have a wildcard subscription on local and not the cloud so if we add bidirectional rules to perform de-duplication for the cloud topics, we would then subscribe to c8y-mqtt/#
on the local broker and duplicate all of those subscriptions. This isn't going to cause a problem with mosquitto, so should be a valid way to verify that the existing logic works, but we will need to ensure we don't have duplicate subscriptions in the final version as that will misbehave with NanoMQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Use different prefixes to bridge outbound and inbound messages
I see this as the simplest and more robust route.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But still using a common prefix to group the c8y-mqtt messages together, but just adding two nested topics:
c8y-mqtt/in
c8y-mqtt/out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in b25610a as @jarhodes314 suggested as that was also a trivial one-liner.
|
||
/// MQTT service endpoint for the Cumulocity tenant, with optional port. | ||
#[tedge_config(example = "mqtt.your-tenant.cumulocity.com:9883")] | ||
#[tedge_config(default(from_optional_key = "c8y.url"))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not better derived from c8y.mqtt
, which will likely already have the correct domain for MQTT connections (I'm assuming this is the same issue as for standard MQTT connections)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved by 015ee3d
} | ||
|
||
// Templates | ||
tc.forward_from_local("#", local_prefix.clone(), "")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some deduplication logic we already have for the AWS shadow topics. Unfortunately there isn't a nice way to naturally avoid loops with MQTT v3 since there is no way to opt out of receiving messages you published (assuming you are subscribed to the relevant topic). So to detect and avoid the loops, the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.
We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally. For testing this works against mosquitto, forwarding the cloud topics bidirectionally (using tc.forward_bidirectionally
) and subscribing separately to #
should be fine, since mosquitto will essentially ignore the double subscription (but this would be problematic with nanomq).
In short, provided we don't have complex overlapping rules, it should be quite possible to make this just work.
Robot Results
Failed Tests
|
|
||
You can customize the documentation and commands shown on this page by providing relevant settings which will be reflected in the instructions. It makes it even easier to explore and use %%te%%. | ||
|
||
<UserContextForm settings="DEVICE_ID,C8Y_URL,C8Y_TENANT_ID,C8Y_USERNAME,C8Y_PASSWORD" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accompanying tedge-docs PR that introduces the keys C8Y_TENANT_ID
and C8Y_PASSWORD
.
Here are some of the rough edges around this feature in its current state:
|
Proposed changes
Types of changes
Paste Link to the issue
Checklist
just prepare-dev
once)just format
as mentioned in CODING_GUIDELINESjust check
as mentioned in CODING_GUIDELINESFurther comments